/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.core;



import com.bff.gaia.unified.sdk.state.TimeDomain;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.util.WindowTracing;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.HashBasedTable;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Table;

import org.joda.time.Instant;



import javax.annotation.Nullable;

import java.util.NavigableSet;

import java.util.NoSuchElementException;

import java.util.TreeSet;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */

public class InMemoryTimerInternals implements TimerInternals {



  /** The current set timers by namespace and ID. */

  Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();



  /** Pending input watermark timers, in timestamp order. */

  private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();



  /** Pending processing time timers, in timestamp order. */

  private NavigableSet<TimerData> processingTimers = new TreeSet<>();



  /** Pending synchronized processing time timers, in timestamp order. */

  private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();



  /** Current input watermark. */

  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;



  /** Current output watermark. */

  @Nullable

  private Instant outputWatermarkTime = null;



  /** Current processing time. */

  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;



  /** Current synchronized processing time. */

  private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;



  @Override

  @Nullable

  public Instant currentOutputWatermarkTime() {

    return outputWatermarkTime;

  }



  /**

   * Returns when the next timer in the given time domain will fire, or {@code null} if there are no

   * timers scheduled in that time domain.

   */

  @Nullable

  public Instant getNextTimer(TimeDomain domain) {

    try {

      switch (domain) {

        case EVENT_TIME:

          return watermarkTimers.first().getTimestamp();

        case PROCESSING_TIME:

          return processingTimers.first().getTimestamp();

        case SYNCHRONIZED_PROCESSING_TIME:

          return synchronizedProcessingTimers.first().getTimestamp();

        default:

          throw new IllegalArgumentException("Unexpected time domain: " + domain);

      }

    } catch (NoSuchElementException exc) {

      return null;

    }

  }



  private NavigableSet<TimerData> timersForDomain(TimeDomain domain) {

    switch (domain) {

      case EVENT_TIME:

        return watermarkTimers;

      case PROCESSING_TIME:

        return processingTimers;

      case SYNCHRONIZED_PROCESSING_TIME:

        return synchronizedProcessingTimers;

      default:

        throw new IllegalArgumentException("Unexpected time domain: " + domain);

    }

  }



  @Override

  public void setTimer(

	  StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {

    setTimer(TimerData.of(timerId, namespace, target, timeDomain));

  }



  /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */

  @Deprecated

  @Override

  public void setTimer(TimerData timerData) {

    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);



    @Nullable

    TimerData existing = existingTimers.get(timerData.getNamespace(), timerData.getTimerId());

    if (existing == null) {

      existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);

      timersForDomain(timerData.getDomain()).add(timerData);

    } else {

      checkArgument(

          timerData.getDomain().equals(existing.getDomain()),

          "Attempt to set %s for time domain %s, but it is already set for time domain %s",

          timerData.getTimerId(),

          timerData.getDomain(),

          existing.getDomain());



      if (!timerData.getTimestamp().equals(existing.getTimestamp())) {

        NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain());

        timers.remove(existing);

        timers.add(timerData);

        existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);

      }

    }

  }



  @Override

  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {

    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");

  }



  /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */

  @Deprecated

  @Override

  public void deleteTimer(StateNamespace namespace, String timerId) {

    TimerData existing = existingTimers.get(namespace, timerId);

    if (existing != null) {

      deleteTimer(existing);

    }

  }



  /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */

  @Deprecated

  @Override

  public void deleteTimer(TimerData timer) {

    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);

    existingTimers.remove(timer.getNamespace(), timer.getTimerId());

    timersForDomain(timer.getDomain()).remove(timer);

  }



  @Override

  public Instant currentProcessingTime() {

    return processingTime;

  }



  @Override

  @Nullable

  public Instant currentSynchronizedProcessingTime() {

    return synchronizedProcessingTime;

  }



  @Override

  public Instant currentInputWatermarkTime() {

    return inputWatermarkTime;

  }



  @Override

  public String toString() {

    return MoreObjects.toStringHelper(getClass())

        .add("watermarkTimers", watermarkTimers)

        .add("processingTimers", processingTimers)

        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)

        .add("inputWatermarkTime", inputWatermarkTime)

        .add("outputWatermarkTime", outputWatermarkTime)

        .add("processingTime", processingTime)

        .toString();

  }



  /** Advances input watermark to the given value. */

  public void advanceInputWatermark(Instant newInputWatermark) throws Exception {

    checkNotNull(newInputWatermark);

    checkState(

        !newInputWatermark.isBefore(inputWatermarkTime),

        "Cannot move input watermark time backwards from %s to %s",

        inputWatermarkTime,

        newInputWatermark);

    WindowTracing.trace(

        "{}.advanceInputWatermark: from {} to {}",

        getClass().getSimpleName(),

        inputWatermarkTime,

        newInputWatermark);

    inputWatermarkTime = newInputWatermark;

  }



  /** Advances output watermark to the given value. */

  public void advanceOutputWatermark(Instant newOutputWatermark) {

    checkNotNull(newOutputWatermark);

    final Instant adjustedOutputWatermark;

    if (newOutputWatermark.isAfter(inputWatermarkTime)) {

      WindowTracing.trace(

          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",

          getClass().getSimpleName(),

          newOutputWatermark,

          inputWatermarkTime);

      adjustedOutputWatermark = inputWatermarkTime;

    } else {

      adjustedOutputWatermark = newOutputWatermark;

    }



    checkState(

        outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),

        "Cannot move output watermark time backwards from %s to %s",

        outputWatermarkTime,

        adjustedOutputWatermark);

    WindowTracing.trace(

        "{}.advanceOutputWatermark: from {} to {}",

        getClass().getSimpleName(),

        outputWatermarkTime,

        adjustedOutputWatermark);

    outputWatermarkTime = adjustedOutputWatermark;

  }



  /** Advances processing time to the given value. */

  public void advanceProcessingTime(Instant newProcessingTime) throws Exception {

    checkNotNull(newProcessingTime);

    checkState(

        !newProcessingTime.isBefore(processingTime),

        "Cannot move processing time backwards from %s to %s",

        processingTime,

        newProcessingTime);

    WindowTracing.trace(

        "{}.advanceProcessingTime: from {} to {}",

        getClass().getSimpleName(),

        processingTime,

        newProcessingTime);

    processingTime = newProcessingTime;

  }



  /** Advances synchronized processing time to the given value. */

  public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)

      throws Exception {

    checkNotNull(newSynchronizedProcessingTime);

    checkState(

        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),

        "Cannot move processing time backwards from %s to %s",

        synchronizedProcessingTime,

        newSynchronizedProcessingTime);

    WindowTracing.trace(

        "{}.advanceProcessingTime: from {} to {}",

        getClass().getSimpleName(),

        synchronizedProcessingTime,

        newSynchronizedProcessingTime);

    synchronizedProcessingTime = newSynchronizedProcessingTime;

  }



  /** Returns the next eligible event time timer, if none returns null. */

  @Nullable

  public TimerData removeNextEventTimer() {

    TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);

    if (timer != null) {

      WindowTracing.trace(

          "{}.removeNextEventTimer: firing {} at {}",

          getClass().getSimpleName(),

          timer,

          inputWatermarkTime);

    }

    return timer;

  }



  /** Returns the next eligible processing time timer, if none returns null. */

  @Nullable

  public TimerData removeNextProcessingTimer() {

    TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);

    if (timer != null) {

      WindowTracing.trace(

          "{}.removeNextProcessingTimer: firing {} at {}",

          getClass().getSimpleName(),

          timer,

          processingTime);

    }

    return timer;

  }



  /** Returns the next eligible synchronized processing time timer, if none returns null. */

  @Nullable

  public TimerData removeNextSynchronizedProcessingTimer() {

    TimerData timer =

        removeNextTimer(synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);

    if (timer != null) {

      WindowTracing.trace(

          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",

          getClass().getSimpleName(),

          timer,

          synchronizedProcessingTime);

    }

    return timer;

  }



  @Nullable

  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {

    NavigableSet<TimerData> timers = timersForDomain(domain);



    if (!timers.isEmpty() && currentTime.isAfter(timers.first().getTimestamp())) {

      TimerData timer = timers.pollFirst();

      existingTimers.remove(timer.getNamespace(), timer.getTimerId());

      return timer;

    } else {

      return null;

    }

  }

}